package com.github.ghsea.framework.job.client;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiConsumer;

import com.github.ghsea.framework.job.common.JobExecuteResult;
import com.github.ghsea.framework.job.common.JobExecuteStatus;
import com.github.ghsea.framework.job.common.support.ExceptionUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class AsyncJobExecuteHandler implements JobExecuteHandler {
	private ExecutorService executor = null;

	private JobExecuteResultReportClient reportClient;

	AsyncJobExecuteHandler() {
		ThreadFactory tf = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SeaJob Client executor thread %d")
				.build();
		executor = Executors.newCachedThreadPool(tf);

		reportClient = new JobExecuteResultReportClient(new JobServerLocator(JobAppFactoryBean.getZkClient()));
	}

	@Override
	public JobExecuteResult handle(Job job, JobExecutionContext ctx) {
		JobExecuteResult ret = new JobExecuteResult();
		try {

			CompletableFuture.runAsync(new JobExecutorTask(job, ctx), executor)
					.whenCompleteAsync(new BiConsumer<Void, Throwable>() {
						@Override
						public void accept(Void t, Throwable ex) {
							JobExecuteResult ret = new JobExecuteResult();
							if (ex == null) {
								ret.setStatus(JobExecuteStatus.OK_EXECUTED.getCode());
							} else {
								ret.setStatus(JobExecuteStatus.ERROR_EXETUTED.getCode());
								ret.setException(ExceptionUtils.getStackTrace(ex));
							}
							
							reportClient.report(ret);
						}
					});
			ret.setStatus(JobExecuteStatus.OK_EXECUTING.getCode());
		} catch (Exception ex) {
			ret.setStatus(JobExecuteStatus.ERROR_EXETUTED.getCode());
			ret.setException(ExceptionUtils.getStackTrace(ex));
		}
		return ret;
	}

	private class JobExecutorTask implements Runnable {
		private Job job;
		private JobExecutionContext ctx;

		JobExecutorTask(final Job job, final JobExecutionContext ctx) {
			this.job = job;
			this.ctx = ctx;
		}

		@Override
		public void run() {
			job.execute(ctx);
		}
	}

}
